package com.pig4cloud.pig.ads.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Lists;
import com.pig4cloud.pig.ads.config.SdkProperties;
import com.pig4cloud.pig.ads.pig.mapper.AdAccountMapper;
import com.pig4cloud.pig.ads.pig.mapper.AdvertiserMapper;
import com.pig4cloud.pig.ads.pig.mapper.TtAccesstokenMapper;
import com.pig4cloud.pig.ads.service.TtAdReportService;
import com.pig4cloud.pig.ads.utils.OEHttpUtils;
import com.pig4cloud.pig.api.entity.AdAccount;
import com.pig4cloud.pig.api.entity.Advertising;
import com.pig4cloud.pig.api.entity.TtAccesstoken;
import com.pig4cloud.pig.api.util.Constants;
import com.pig4cloud.pig.api.vo.TtAdReportVO;
import com.pig4cloud.pig.api.vo.TtAdReportVO.ReportAdvertiser;
import com.sjda.framework.common.utils.MapUtils;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAdjusters;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Consumer;

/**
 * @Title null.java
 * @Package com.pig4cloud.pig.ads.service.impl
 * @Author 马嘉祺
 * @Date 2021/9/6 16:34
 * @Description
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class TtAdReportServiceImpl implements TtAdReportService {

	private static final int QUERY_PAGE_SIZE = 200, BATCH_SAVE_SIZE = 200;

	private static final BigDecimal PERCENTAGE_CONSTANT = BigDecimal.valueOf(100);

	public static final String CREATIVE_DAY_GRANULARITY = "STAT_TIME_GRANULARITY_DAILY";
	public static final String CREATIVE_HOUR_GRANULARITY = "STAT_TIME_GRANULARITY_HOURLY";

	private static final String[] FIELDS_NAMES = {"show", "click", "cost", "play_25_feed_break", "play_50_feed_break", "play_75_feed_break", "play_100_feed_break", "play_over_rate", "valid_play", "total_play", "convert", "download_finish", "next_day_open", "next_day_open_rate", "next_day_open_cost"};

	private static final ZoneOffset DONGBA_DISTRICT = ZoneOffset.ofHours(8);
	private static final DateTimeFormatter DATE_FORMATTER = new DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd").toFormatter();
	private static final DateTimeFormatter TIME_FORMATTER = new DateTimeFormatterBuilder().appendPattern("HH:mm:ss").toFormatter();
	private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd HH:mm:ss").toFormatter();

	private static final String CREATIVE_DAY_SAVE_SQL =
			"INSERT INTO oe_creative_day_report (" +
					"creative_id, ad_id, ad_name, ad_account, date, cost, adshow, click, adconvert, download_finish, next_day_open, next_day_open_rate, next_day_open_cost," +
					"play_25_feed_break, play_50_feed_break, play_75_feed_break, play_99_feed_break, play_100_feed_break, total_play, valid_play, createtime, iver" +
					") VALUES (" +
					"?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?" +
					")";
	private static final String CREATIVE_HOUR_SAVE_SQL =
			"INSERT INTO oe_creative_hour_report (" +
					"creative_id, ad_id, ad_name, ad_account, date, stat_datetime, cost, adshow, click, adconvert, download_finish, next_day_open, next_day_open_rate, next_day_open_cost, createtime, iver" +
					") VALUES (" +
					"?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?" +
					")";

	@Value("${tt_creative_report_uri:https://ad.oceanengine.com/open_api/2/report/creative/get/}")
	private String ttCreativeReportUri;

	@Qualifier("clickhouseTemplate")
	private final JdbcTemplate clickhouseTemplate;

	private final TtAccesstokenMapper ttAccesstokenMapper;

	private final AdvertiserMapper advertiserMapper;

	private final KafkaTemplate<String, byte[]> kafkaTemplate;

	private final SdkProperties sdkProperties;

	/**
	 * 同步头条创意日报表数据
	 *
	 * @param report
	 */
	@Override
	public void syncCreativeDayList(TtAdReportVO report) {
		final LocalDateTime now = LocalDateTime.now();
		final String nowStr = now.format(DATE_TIME_FORMATTER);
		final long nowMillis = now.toInstant(DONGBA_DISTRICT).toEpochMilli();

		// 获取广告主信息以及对应的accesstoken
		final Collection<ReportAdvertiser> advertisers = this.getAdvertiserList(report);
//		XxlJobLogger.log(String.format("本次查询广告账户: %s", JSON.toJSONString(advertisers)));
		// 获取起始日期
		final LocalDate[] dateRange = this.getDateRange(report, now);

		this.processCreativeReportList(advertisers, dateRange[0], dateRange[1], CREATIVE_DAY_GRANULARITY, (object, advertiserId) -> object.fluentPut("advertiser_id", advertiserId), list -> {
			// 将数据入库
			clickhouseTemplate.batchUpdate(CREATIVE_DAY_SAVE_SQL, list, list.size(), (ps, creative) -> {
				int columnIdx = 0;

				String statDatetime = LocalDateTime.parse(creative.getString("stat_datetime"), DATE_TIME_FORMATTER).toLocalDate().format(DATE_FORMATTER);
				final long totalPlay = ObjectUtils.defaultIfNull(creative.getLongValue("total_play"), 0L);
				final BigDecimal playOverRate = ObjectUtils.defaultIfNull(creative.getBigDecimal("play_over_rate"), BigDecimal.ZERO);
				final long playOver = playOverRate.multiply(BigDecimal.valueOf(totalPlay)).divide(PERCENTAGE_CONSTANT, 0, BigDecimal.ROUND_HALF_UP).longValue();
				final BigDecimal cost = ObjectUtils.defaultIfNull(creative.getBigDecimal("cost"), BigDecimal.ZERO).setScale(3, BigDecimal.ROUND_HALF_UP);
				final BigDecimal nextDayOpenRate = ObjectUtils.defaultIfNull(creative.getBigDecimal("next_day_open_rate"), BigDecimal.ZERO).setScale(3, BigDecimal.ROUND_HALF_UP);
				final BigDecimal nextDayOpenCost = ObjectUtils.defaultIfNull(creative.getBigDecimal("next_day_open_cost"), BigDecimal.ZERO).setScale(3, BigDecimal.ROUND_HALF_UP);

				ps.setString(++columnIdx, creative.getString("creative_id"));
				ps.setString(++columnIdx, creative.getString("ad_id"));
				ps.setString(++columnIdx, creative.getString("ad_name"));
				ps.setString(++columnIdx, creative.getString("advertiser_id"));
				ps.setString(++columnIdx, statDatetime);
				ps.setBigDecimal(++columnIdx, cost);
				ps.setLong(++columnIdx, creative.getLong("show"));
				ps.setLong(++columnIdx, creative.getLong("click"));
				ps.setLong(++columnIdx, creative.getLong("convert"));
				ps.setLong(++columnIdx, creative.getLong("download_finish"));
				ps.setLong(++columnIdx, creative.getLong("next_day_open"));
				ps.setBigDecimal(++columnIdx, nextDayOpenRate);
				ps.setBigDecimal(++columnIdx, nextDayOpenCost);
				ps.setLong(++columnIdx, creative.getLong("play_25_feed_break"));
				ps.setLong(++columnIdx, creative.getLong("play_50_feed_break"));
				ps.setLong(++columnIdx, creative.getLong("play_75_feed_break"));
				ps.setLong(++columnIdx, creative.getLong("play_100_feed_break")); // 头条99%播放数
				ps.setLong(++columnIdx, playOver); // 头条100%播放数（完成播放数）
				ps.setLong(++columnIdx, totalPlay);
				ps.setLong(++columnIdx, creative.getLong("valid_play"));
				ps.setString(++columnIdx, nowStr);
				ps.setLong(++columnIdx, nowMillis);

				creative.fluentPut("total_play", totalPlay).fluentPut("play_over_rate", playOverRate.setScale(3, BigDecimal.ROUND_HALF_UP)).fluentPut("cost", cost).fluentPut("next_day_open_rate", nextDayOpenRate).fluentPut("next_day_open_cost", nextDayOpenCost);
			});
			// 将数据上报给动心平台
			Lists.partition(list,200).forEach(a->  reportTtAdCreativeReportDongXin(a,Constants.PANGU_GDT_AD_GROUP_DAY_REPORT,200));
		});
	}

	//分批 上报给动心平台 不能超过1048576bytes
	private void reportTtAdCreativeReportDongXin(List list,String eventName,int size) {
		if(JSON.toJSONString(list).length() >= 1048000){
			log.info("上报动心长度：{} 超1048000 长处理" ,JSON.toJSONString(list).length());
			XxlJobLogger.log("上报动心长度：{} 超1048000 长处理" ,JSON.toJSONString(list).length());
			Lists.partition(list,size>>1).forEach(a-> reportTtAdCreativeReportDongXin((List) a,eventName,size>>1));
		}else {
			Map<String, Object> map = new HashMap<>();
			map.put(Constants.DATA, JSON.toJSONString(list));
			map.put(Constants.DATA_TYPE_DONGXIN_EVENTNAME, eventName);
			map.put(Constants.DATA_TYPE_DONGXIN_EVENTTIME, System.currentTimeMillis());
			kafkaTemplate.send(sdkProperties.getDongxinKafkaTopic(), MapUtils.convertMapToByte(map));
		}
	}

	/**
	 * 同步头条创意小时报表数据
	 *
	 * @param report
	 */
	@Override
	public void syncCreativeHourList(TtAdReportVO report) {
		final LocalDateTime now = LocalDateTime.now();
		final String nowStr = now.format(DATE_TIME_FORMATTER);
		final long nowMillis = now.toInstant(DONGBA_DISTRICT).toEpochMilli();

		// 获取广告主信息以及对应的accesstoken
		final Collection<ReportAdvertiser> advertisers = this.getAdvertiserList(report);
		// 获取起始日期
		final LocalDate[] dateRange = this.getDateRange(report, now);

		this.processCreativeReportList(advertisers, dateRange[0], dateRange[1], CREATIVE_HOUR_GRANULARITY, (object, advertiserId) -> object.fluentPut("advertiser_id", advertiserId), list -> clickhouseTemplate.batchUpdate(CREATIVE_HOUR_SAVE_SQL, list, list.size(), (ps, creative) -> {
			int columnIdx = 0;

			final LocalDateTime statDatetime = LocalDateTime.parse(creative.getString("stat_datetime"), DATE_TIME_FORMATTER);
			final BigDecimal cost = ObjectUtils.defaultIfNull(creative.getBigDecimal("cost"), BigDecimal.ZERO).setScale(3, BigDecimal.ROUND_HALF_UP);
			final BigDecimal nextDayOpenRate = ObjectUtils.defaultIfNull(creative.getBigDecimal("next_day_open_rate"), BigDecimal.ZERO).setScale(3, BigDecimal.ROUND_HALF_UP);
			final BigDecimal nextDayOpenCost = ObjectUtils.defaultIfNull(creative.getBigDecimal("next_day_open_cost"), BigDecimal.ZERO).setScale(3, BigDecimal.ROUND_HALF_UP);

			ps.setString(++columnIdx, creative.getString("creative_id"));
			ps.setString(++columnIdx, creative.getString("ad_id"));
			ps.setString(++columnIdx, creative.getString("ad_name"));
			ps.setString(++columnIdx, creative.getString("advertiser_id"));
			ps.setString(++columnIdx, statDatetime.format(DATE_FORMATTER));
			ps.setString(++columnIdx, statDatetime.format(TIME_FORMATTER));
			ps.setBigDecimal(++columnIdx, cost);
			ps.setLong(++columnIdx, creative.getLong("show"));
			ps.setLong(++columnIdx, creative.getLong("click"));
			ps.setLong(++columnIdx, creative.getLong("convert"));
			ps.setLong(++columnIdx, creative.getLong("download_finish"));
			ps.setLong(++columnIdx, creative.getLong("next_day_open"));
			ps.setBigDecimal(++columnIdx, nextDayOpenRate);
			ps.setBigDecimal(++columnIdx, nextDayOpenCost);
			ps.setString(++columnIdx, nowStr);
			ps.setLong(++columnIdx, nowMillis);
		}));

	}

	private Collection<ReportAdvertiser> getAdvertiserList(TtAdReportVO report) {
		if (null != report && null != report.getAdvertisers()) {
			return report.getAdvertisers();
		}
		Collection<ReportAdvertiser> advertisers = new LinkedList<>();
		List<TtAccesstoken> accesstokens = ttAccesstokenMapper.selectList(Wrappers.<TtAccesstoken>lambdaQuery().select(TtAccesstoken::getAdAccount, TtAccesstoken::getAccessToken, TtAccesstoken::getHousekeeper));
		for (TtAccesstoken access : accesstokens) {
			if (1 == access.getHousekeeper()) {
				advertisers.add(new ReportAdvertiser(access.getAdAccount(), access.getAccessToken()));
			} else if (2 == access.getHousekeeper()) {
				List<Advertising> accounts = advertiserMapper.selectList(Wrappers.<Advertising>lambdaQuery()
						.select(Advertising::getAdvertiserId)
						.eq(Advertising::getDeleted,0)
						.eq(Advertising::getHousekeeper,access.getAdAccount()));
				for (Advertising account : accounts) {
					advertisers.add(new ReportAdvertiser(account.getAdvertiserId(), access.getAccessToken()));
				}
			}
		}
		return advertisers;
	}

	private LocalDate[] getDateRange(TtAdReportVO report, LocalDateTime now) {
		LocalDate startDate, endDate;
		String sDateStr = report.getStart_date(), eDateStr = report.getEnd_date();
		if (StringUtils.isNotEmpty(sDateStr) && StringUtils.isNotEmpty(eDateStr)) {
			startDate = LocalDate.parse(sDateStr, DATE_FORMATTER);
			endDate = LocalDate.parse(eDateStr, DATE_FORMATTER);
		} else {
			// 如果是凌晨【00:00:00 ~ 01:00:00】以内的时间同步数据，则需要从前一天开始同步，避免前一天定时周期内的数据遗漏，这需要定时任务执行周期必须小于1小时
			startDate = now.getHour() == 1 || now.getHour() == 10 ? now.minusDays(1).toLocalDate() : now.toLocalDate();
			startDate = now.getHour() <= 12 ? now.minusDays(2).toLocalDate() : startDate;
			endDate = now.toLocalDate();
		}
		if(report.getDays()!=null){
			startDate = LocalDate.now().plusDays(report.getDays());
			endDate = startDate;
		}
		if(report.getInventory() != null){
			startDate = LocalDate.now().minus(report.getInventory(), ChronoUnit.MONTHS).with(TemporalAdjusters.firstDayOfMonth());
			endDate = LocalDate.now().minus(report.getInventory(), ChronoUnit.MONTHS).with(TemporalAdjusters.lastDayOfMonth());
		}
		return new LocalDate[]{startDate, endDate};
	}

	private <R> void processCreativeReportList(Collection<ReportAdvertiser> advertisers, LocalDate startDate, LocalDate endDate, String timeGranularity, BiFunction<JSONObject, Long, R> queryFunc, Consumer<List<R>> consumer) {
		final int maxTimeRage;
		if (CREATIVE_DAY_GRANULARITY.equals(timeGranularity)) {
			maxTimeRage = 30;
		} else if (CREATIVE_HOUR_GRANULARITY.equals(timeGranularity)) {
			maxTimeRage = 1;
		} else {
			throw new IllegalArgumentException("未知的创意报表查询粒度");
		}

		String sDateStr = startDate.format(DATE_FORMATTER), eDateStr = endDate.format(DATE_FORMATTER);
		int totalCount = 0;
		List<R> list = new LinkedList<>();
		while (startDate.compareTo(endDate) <= 0) {
			int timeCount = 0;
			// 获取本次要查询的结束时间
			LocalDate tempEndDate = startDate.plusDays(maxTimeRage - 1);
			tempEndDate = tempEndDate.compareTo(endDate) > 0 ? endDate : tempEndDate;
			sDateStr = startDate.format(DATE_FORMATTER);
			eDateStr = tempEndDate.format(DATE_FORMATTER);
			try {
				for (ReportAdvertiser advertiser : advertisers) {
					long advertiserId = Long.parseLong(advertiser.getAdvertiserId());

					int page = 0;
					List<R> tempList = this.queryCreativeReportList(advertiserId, advertiser.getAccessToken(), sDateStr, eDateStr, timeGranularity, ++page, QUERY_PAGE_SIZE, queryFunc, 0);
					while (tempList.size() > 0) {
						for (int i = 0; i < tempList.size(); ++i, ++totalCount, ++timeCount) {
							R object = tempList.get(i);
							list.add(object);
							if (totalCount > 0 && totalCount % BATCH_SAVE_SIZE == 0 && list.size() > 0) {
								consumer.accept(list);
								list.clear();
							}
						}
						tempList = this.queryCreativeReportList(advertiserId, advertiser.getAccessToken(), sDateStr, eDateStr, timeGranularity, ++page, QUERY_PAGE_SIZE, queryFunc, 0);
					}

				}

				startDate = tempEndDate.plusDays(1);
			} catch (Exception e) {
				String error = String.format("同步 %s ~ %s 创意报表异常", sDateStr, eDateStr);
				log.error(error, e);
				XxlJobLogger.log(error);
			}
			String success = String.format("同步 %s ~ %s 创意报表%d条数据", sDateStr, eDateStr, timeCount);
			log.info(success);
			XxlJobLogger.log(success);
		}
		// 最后将可能没有入库的数据进行入库
		if (totalCount > 0 && list.size() > 0) {
			consumer.accept(list);
			list.clear();
		}
		String success = String.format("总共同步 %s ~ %s 创意报表%d条数据", sDateStr, eDateStr, totalCount);
		log.info(success);
		XxlJobLogger.log(success);
	}

	public <R> List<R> queryCreativeReportList(Long advertiserId, String accessToken, String sDateStr, String eDateStr, String timeGranularity, int page, int pageSize, BiFunction<JSONObject, Long, R> consumer, int retryNo) {
		retryNo = Math.max(retryNo, 0);
		JSONObject jsonBody = new JSONObject().fluentPut("advertiser_id", advertiserId).fluentPut("start_date", sDateStr).fluentPut("end_date", eDateStr).fluentPut("time_granularity", timeGranularity).fluentPut("fields", FIELDS_NAMES)
				.fluentPut("group_by", new String[]{"STAT_GROUP_BY_FIELD_ID", "STAT_GROUP_BY_FIELD_STAT_TIME"}).fluentPut("filtering", Collections.singletonMap("status", "CREATIVE_STATUS_ALL")).fluentPut("page", page).fluentPut("page_size", pageSize);
		String response = OEHttpUtils.getBuilder().uri(ttCreativeReportUri).accessToken(accessToken).contentType(MediaType.APPLICATION_JSON_VALUE).httpEntity(new StringEntity(jsonBody.toJSONString(), ContentType.APPLICATION_JSON)).request();
		if (StringUtils.isEmpty(response)) {
			String error = String.format("调用头条接口失败: param(advertiserId=%d, startDate=%s, endDate=%s, timeGranularity=%s, page=%d, pageSize=%d), result(http error)", advertiserId, sDateStr, eDateStr, timeGranularity, page, pageSize);
			log.warn(error);
			XxlJobLogger.log(error);
			return Collections.emptyList();
		}
		JSONObject result = JSON.parseObject(response);
		Integer code = result.getInteger("code");
		if (retryNo < 3 && 40100 == code) {
			// 应答为请求过于频繁错误时，重新请求一次，最多重试3次
			String message = result.getString("message");
			String error = String.format("频繁调用头条接口失败: param(advertiserId=%d, startDate=%s, endDate=%s, timeGranularity=%s, page=%d, pageSize=%d), result(code=%d, message=%s)", advertiserId, sDateStr, eDateStr, timeGranularity, page, pageSize, code, message);
			log.warn(error);
			XxlJobLogger.log(error);
			this.sleep(1000);
			return queryCreativeReportList(advertiserId, accessToken, sDateStr, eDateStr, timeGranularity, page, pageSize, consumer, ++retryNo);
		} else if (0 != code) {
			// 应答为其他错误时，终止本次请求
			String message = result.getString("message");
			String error = String.format("未知调用头条接口失败: param(advertiserId=%d, startDate=%s, endDate=%s, timeGranularity=%s, page=%d, pageSize=%d), result(code=%d, message=%s)", advertiserId, sDateStr, eDateStr, timeGranularity, page, pageSize, code, message);
			log.warn(error);
			XxlJobLogger.log(error);
			return Collections.emptyList();
		}/* else {
			String message = result.getString("message");
			String success = String.format("调用头条接口成功: param(advertiserId=%d, startDate=%s, endDate=%s, timeGranularity=%s, page=%d, pageSize=%d), result(code=%d, message=%s)", advertiserId, sDateStr, eDateStr, timeGranularity, page, pageSize, code, message);
			XxlJobLogger.log(success);
		}*/
		// 正确应答处理逻辑
		JSONObject data = result.getJSONObject("data");
		JSONArray list;
		if (null == data || null == (list = data.getJSONArray("list"))) {
			return Collections.emptyList();
		}
		List<R> results = new LinkedList<>();
		for (int i = 0; i < list.size(); ++i) {
			final JSONObject jsonObject = list.getJSONObject(i);
//			XxlJobLogger.log(String.format(">> 成功拉取的数据: advertiserId=%d, data=%s", advertiserId, jsonObject.toJSONString()));
			results.add(consumer.apply(jsonObject, advertiserId));
		}
		return results;
	}

	private void sleep(long timeoutMs) {
		try {
			Thread.sleep(timeoutMs);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}

}
